home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- import os
- import sys
- import math
- import gobject
- import pygst
- pygst.require('0.10')
- import gst
- import utils
- from pygobject import gsignal
- import sources
- from sources import EOS, ERROR, UNKNOWN_TYPE, WRONG_TYPE
-
- class Leveller(gst.Pipeline):
- """
- I am a pipeline that calculates RMS values and mix-in/out points.
- I will signal 'done' when I'm done scanning the file, with return value
- EOS, ERROR, UNKNOWN_TYPE, or WRONG_TYPE from gst.extend.sources
- """
- gsignal('done', str)
-
- def __init__(self, filename, threshold = -9):
- gst.Pipeline.__init__(self)
- self._filename = filename
- self._thresholddB = threshold
- self._threshold = math.pow(10, self._thresholddB / 10)
- self._source = sources.AudioSource(filename)
- self._source.connect('done', self._done_cb)
- self._level = gst.element_factory_make('level')
- self._fakesink = gst.element_factory_make('fakesink')
- self.add(self._source, self._level, self._fakesink)
- self._source.connect('pad-added', self._sourcePadAddedCb)
- self._level.link(self._fakesink)
- self._rmsdB = { }
- self._peakdB = 0
- self._meansquaresums = []
- self._peaksdB = []
- self._lasttime = 0
- self.mixin = 0
- self.mixout = 0
- self.length = 0
- self.rms = 0
- self.rmsdB = 0
-
-
- def _sourcePadAddedCb(self, source, pad):
- self._source.link(self._level)
-
-
- def do_handle_message(self, message):
- self.debug('got message %r' % message)
- if message.type == gst.MESSAGE_ELEMENT and message.src == self._level:
- struc = message.structure
- endtime = struc['endtime']
- rmss = struc['rms']
- peaks = struc['peak']
- decays = struc['decay']
- infos = zip(rmss, peaks, decays)
- channid = 0
- for rms, peak, decay in infos:
- self._level_cb(message.src, endtime, channid, rms, peak, decay)
- channid += 1
-
- elif message.type == gst.MESSAGE_EOS:
- self._eos_cb(message.src)
-
- gst.Pipeline.do_handle_message(self, message)
-
-
- def _level_cb(self, element, time, channel, rmsdB, peakdB, decaydB):
- if time > self._lasttime and self._lasttime > 0:
- meansquaresum = 0
- for i in self._rmsdB.keys():
- meansquaresum += math.pow(10, self._rmsdB[i] / 10)
-
- meansquaresum /= len(self._rmsdB.keys())
-
- try:
- rmsdBstr = str(10 * math.log10(meansquaresum))
- except OverflowError:
- rmsdBstr = '(-inf)'
-
- gst.log('meansquaresum %f (%s dB)' % (meansquaresum, rmsdBstr))
- self._peaksdB.append((self._lasttime, peakdB))
- self._meansquaresums.append((self._lasttime, meansquaresum))
- self._rmsdB = { }
- self._peakdB = 0
-
- gst.log('time %s, channel %d, rmsdB %f' % (gst.TIME_ARGS(time), channel, rmsdB))
- self._lasttime = time
- self._rmsdB[channel] = rmsdB
- if peakdB > self._peakdB:
- self._peakdB = peakdB
-
-
-
- def _done_cb(self, source, reason):
- gst.debug('done, reason %s' % reason)
- if reason == EOS:
- return None
- self.emit('done', reason)
-
-
- def _eos_cb(self, source):
- gst.debug('eos, start calcing')
- highestdB = self._peaksdB[0][1]
- for time, peakdB in self._peaksdB:
- if peakdB > highestdB:
- highestdB = peakdB
- continue
-
- gst.debug('highest peak(dB): %f' % highestdB)
- (self.length, peakdB) = self._peaksdB[-1]
- for time, peakdB in self._peaksdB:
- gst.log('time %s, peakdB %f' % (gst.TIME_ARGS(time), peakdB))
- if peakdB > self._thresholddB + highestdB:
- gst.debug('found mix-in point of %f dB at %s' % (peakdB, gst.TIME_ARGS(time)))
- self.mixin = time
- break
- continue
-
- self._peaksdB.reverse()
- found = None
- for time, peakdB in self._peaksdB:
- if found:
- self.mixout = time
- gst.debug('found mix-out point of %f dB right before %s' % (found, gst.TIME_ARGS(time)))
- break
-
- if peakdB > self._thresholddB + highestdB:
- found = peakdB
- continue
-
- weightedsquaresums = 0
- lasttime = self.mixin
- for time, meansquaresum in self._meansquaresums:
- if time <= self.mixin:
- continue
-
- delta = time - lasttime
- weightedsquaresums += meansquaresum * delta
- gst.log('added MSS %f over time %s at time %s, now %f' % (meansquaresum, gst.TIME_ARGS(delta), gst.TIME_ARGS(time), weightedsquaresums))
- lasttime = time
- if time > self.mixout:
- break
- continue
-
-
- try:
- ms = weightedsquaresums / (self.mixout - self.mixin)
- except ZeroDivisionError:
- gst.warning('ZeroDivisionError on %s, mixin %s, mixout %s' % (self._filename, gst.TIME_ARGS(self.mixin), gst.TIME_ARGS(self.mixout)))
- self.emit('done', WRONG_TYPE)
- return None
-
- self.rms = math.sqrt(ms)
- self.rmsdB = 10 * math.log10(ms)
- self.emit('done', EOS)
-
-
- def start(self):
- gst.debug('Setting to PLAYING')
- self.set_state(gst.STATE_PLAYING)
- gst.debug('Set to PLAYING')
-
-
- def stop(self):
- """
- Stop the leveller, freeing all resources.
- Call after the leveller emitted 'done' to clean up.
- """
- gst.debug('Setting to NULL')
- self.set_state(gst.STATE_NULL)
- gst.debug('Set to NULL')
- utils.gc_collect('Leveller.stop()')
-
-
- def clean(self):
- self.stop()
- self.remove(self._source)
- self.remove(self._level)
- self.remove(self._fakesink)
- gst.debug('Emptied myself')
- self._source.clean()
- utils.gc_collect('Leveller.clean() cleaned up source')
- self._source = None
- self._fakesink = None
- self._level = None
- utils.gc_collect('Leveller.clean() done')
-
-
- gobject.type_register(Leveller)
- if __name__ == '__main__':
- main = gobject.MainLoop()
-
- try:
- leveller = Leveller(sys.argv[1])
- except IndexError:
- sys.stderr.write('Please give a file to calculate level of\n')
- sys.exit(1)
-
- print 'Starting'
- bus = leveller.get_bus()
- bus.add_signal_watch()
- dontstop = True
- leveller.set_state(gst.STATE_PLAYING)
- while dontstop:
- message = bus.poll(gst.MESSAGE_ANY, gst.SECOND)
- if message:
- gst.debug('got message from poll:%s/%r' % (message.type, message))
- else:
- gst.debug('got NOTHING from poll')
- if message:
- if message.type == gst.MESSAGE_EOS:
- print 'in: %s, out: %s, length: %s' % (gst.TIME_ARGS(leveller.mixin), gst.TIME_ARGS(leveller.mixout), gst.TIME_ARGS(leveller.length))
- print 'rms: %f, %f dB' % (leveller.rms, leveller.rmsdB)
- dontstop = False
- elif message.type == gst.MESSAGE_ERROR:
- (error, debug) = message.parse_error()
- print 'ERROR[%s] %s' % (error.domain, error.message)
- dontstop = False
-
- message.type == gst.MESSAGE_EOS
- leveller.stop()
- leveller.clean()
- gst.debug('deleting leveller, verify objects are freed')
- utils.gc_collect('quit main loop')
- del leveller
- utils.gc_collect('deleted leveller')
- gst.debug('stopping forever')
-
-